package com.lsh.ofc.worker.kafka;

import com.alibaba.fastjson.JSON;
import com.dmall.dmg.sdk.core.consumer.ConsumerContext;
import com.dmall.dmg.sdk.core.consumer.ConsumerRecord;
import com.dmall.dmg.sdk.core.consumer.MessageHandler;
import com.lsh.base.common.json.JsonUtils;
import com.lsh.ofc.core.constant.Constants;
import com.lsh.ofc.core.handler.CreateSumSoObdHandler;
import com.lsh.ofc.core.model.DmgSoObdVo;
import com.lsh.ofc.core.redis.RedisTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.text.MessageFormat;
import java.util.List;

/**
 * @author peter
 */
@Component
@Slf4j
public class WumartKafkaConsumerService implements MessageHandler {

    @Autowired
    private CreateSumSoObdHandler sumSoObdHandler;

    @Autowired
    private RedisTemplate redisTemplate;

    @Override
    public void onMessage(List<ConsumerRecord> records, ConsumerContext consumerContext) {
        for (ConsumerRecord consumerRecord : records) {
            this.dealNotify(consumerRecord);
        }
    }

    private String getKey(String messageId) {

        return MessageFormat.format(Constants.OFC_SO_SUM_KEY, messageId);
    }

    private void dealNotify(ConsumerRecord consumerRecord) {

        final String messageId = consumerRecord.getMessageId();
        log.info("messageId : {} 开始处理so obd 汇总信息 , 参数为 {}", messageId, consumerRecord.getMessage());
        try {
            if (!redisTemplate.lock(MessageFormat.format(Constants.OFC_SO_SUM_LOCK, messageId), 30)) {
                log.info("messageId = " + messageId + "信息处理中。。。");
                return;
            }
            //具体kafka数据
            String messageStr = consumerRecord.getMessage();
            log.info("[物美 so obd 汇总信息为] messageStr :" + messageStr);
            final DmgSoObdVo dmgSoObdVo = JsonUtils.json2Obj(messageStr, DmgSoObdVo.class);
            log.info("[物美 so obd 汇总信息为] messageJson:" + JSON.toJSONString(dmgSoObdVo));

            redisTemplate.hset(Constants.OFC_SO_OBD_SUM_KEY, messageId, messageStr);
            redisTemplate.set(this.getKey(messageId), messageStr);
            // 业务处理流程 一次处理一个物美so
            int success = this.sumSoObdHandler.process(dmgSoObdVo);

            if (success > 0) {
                log.info("ofcSoObdSumJob [物美 so obd 汇总] 处理成功 messageId {}",messageId);
                redisTemplate.hdel(Constants.OFC_SO_OBD_SUM_KEY, messageId);
            }else{
                log.info("ofcSoObdSumJob [物美 so obd 汇总] 处理失败 messageId {}", messageId);
            }
        } catch (Exception e) {
            log.error("[物美 so obd 汇总信息为] 异常", e);
        }
    }

}
